Hazelcast একটি শক্তিশালী ইন-মেমরি ডিস্ট্রিবিউটেড ডেটা গ্রিড এবং ক্লাস্টার ব্যবস্থাপনা প্ল্যাটফর্ম যা ডিস্ট্রিবিউটেড ক্যাশিং, ডেটা স্টোরেজ, এবং কম্পিউটিং এর জন্য ব্যাপকভাবে ব্যবহৃত হয়। Hazelcast-এর অনেক অগ্রসর (advanced) বৈশিষ্ট্য রয়েছে যা উচ্চ কার্যকারিতা, স্কেলেবিলিটি এবং ফল্ট টলারেন্স নিশ্চিত করতে সাহায্য করে। এই টিউটোরিয়ালে আমরা Advanced Hazelcast Topics নিয়ে আলোচনা করব, যা উন্নত ব্যবহারকারীদের জন্য সাহায্য করবে।
Hazelcast-এর Querying ক্ষমতা একাধিক ডিস্ট্রিবিউটেড ডেটা স্ট্রাকচার (যেমন IMap
, IList
, ISet
) উপর কার্যকরী প্রশ্ন বা কুয়েরি চালানোর জন্য ব্যবহৃত হয়। এর মধ্যে কিছু উন্নত কুয়েরি কৌশল রয়েছে:
Hazelcast এ ডেটা খোঁজার জন্য ইনডেক্স ব্যবহার করা যায়, যা কুয়েরির পারফরম্যান্স অনেক উন্নত করে। IMap
এর উপর ইনডেক্স তৈরি করে আপনি কুয়েরি অপারেশনগুলির গতি বৃদ্ধি করতে পারেন।
IMap<String, Person> map = hz.getMap("personMap");
map.addIndex("age", false); // Index on 'age' field
// Query with index
Collection<Person> result = map.values(Predicates.greaterThan("age", 30));
Hazelcast Predicates ব্যবহারের মাধ্যমে বিভিন্ন শর্তের উপর কুয়েরি করতে পারে। এটি Equality, GreaterThan, LessThan, Like ইত্যাদি কুয়েরি শর্তগুলি প্রয়োগ করতে সহায়ক।
Predicate<String, Person> predicate = Predicates.equal("name", "John");
Collection<Person> result = map.values(predicate);
Hazelcast aggregation queries সমর্থন করে, যা আপনার ডেটা থেকে রিয়েল-টাইম রিপোর্ট বা স্ট্যাটিস্টিকস বের করতে সাহায্য করে।
AggregationResult result = map.aggregate(new Aggregators().avg("age"));
Hazelcast Jet হল Hazelcast এর ডিস্ট্রিবিউটেড কম্পিউটিং ইঞ্জিন, যা স্ট্রিমিং ডেটা প্রসেসিং এবং ব্যাচ কম্পিউটেশন সমর্থন করে। এটি real-time data processing এবং analytics এর জন্য উপযুক্ত।
Hazelcast Jet-এ Pipelines তৈরি করতে পারেন, যা ডিস্ট্রিবিউটেড কম্পিউটেশন চালানোর জন্য ব্যবহার করা হয়।
Pipeline p = Pipeline.create();
p.readFrom(Sources.files("data.txt"))
.map(line -> line.split(","))
.filter(parts -> parts[0].equals("John"))
.writeTo(Sinks.files("output.txt"));
Hazelcast Jet স্ট্রিমিং ডেটা প্রসেসিং সমর্থন করে, যেখানে আপনি একটি ইনপুট স্ট্রিম থেকে ডেটা প্রসেস করতে পারেন।
Pipeline p = Pipeline.create();
p.readFrom(Sources.stream("sourceStream"))
.map(value -> value + " processed")
.writeTo(Sinks.stream("destinationStream"));
Hazelcast Jet ব্যাচ ডেটা প্রসেসিংয়ের জন্যও উপযুক্ত। আপনি একসাথে ডেটার একটি বড় সেট প্রসেস করতে পারেন।
BatchSource<Integer> source = Sources.batchFromIterable(Arrays.asList(1, 2, 3));
source.aggregate(Aggregators.sum());
Hazelcast ডিস্ট্রিবিউটেড ট্রানজেকশন সমর্থন করে, যা একাধিক নোডের মধ্যে অ্যাটমিক অপারেশন চালানোর জন্য ব্যবহার করা হয়। Transactional IMap, Transactional IList এবং অন্যান্য ডেটা স্ট্রাকচার ব্যবহার করে আপনি ট্রানজেকশন পরিচালনা করতে পারেন।
Hazelcast-এ ট্রানজেকশন শুরু করতে, TransactionContext এবং TransactionalMap ব্যবহার করা হয়।
TransactionContext context = hz.getTransactionContext();
Transaction tx = context.beginTransaction();
try {
IMap<String, String> map = context.getMap("map");
map.put("key", "value");
tx.commit(); // Commit the transaction
} catch (Exception e) {
tx.rollback(); // Rollback in case of failure
}
Hazelcast ক্লাস্টারে ডিস্ট্রিবিউটেড ট্রানজেকশন নিশ্চিত করার জন্য এটি 2PC (Two Phase Commit) প্রটোকল ব্যবহার করে।
WAN Replication হল একটি মেকানিজম যা একাধিক জিওগ্রাফিক্যাল অবস্থানে Hazelcast ক্লাস্টারের মধ্যে ডেটা সিঙ্ক্রোনাইজেশন নিশ্চিত করে। এটি ক্লাস্টারকে মুলত multi-region বা multi-data center সমর্থন করে, যেখানে ডেটা বিভিন্ন অঞ্চলের মধ্যে সিঙ্ক্রোনাইজ হয়।
Hazelcast WAN replication কনফিগার করার জন্য, আপনাকে WAN replication configuration সেটআপ করতে হবে। উদাহরণস্বরূপ:
<hazelcast>
<wan-replication>
<name>myReplication</name>
<endpoint>
<remote-address>192.168.1.100:5701</remote-address>
</endpoint>
</wan-replication>
</hazelcast>
এই কনফিগারেশন দ্বারা Hazelcast ডেটা ক্লাস্টারগুলির মধ্যে ডেটা রিপ্লিকেশন এবং সিঙ্ক্রোনাইজেশন করা হবে।
Hazelcast মাইক্রোসার্ভিস আর্কিটেকচারে গুরুত্বপূর্ণ ভূমিকা পালন করে, বিশেষ করে distributed caching, service discovery, এবং event-driven architecture এর জন্য।
Hazelcast ক্লাস্টারগুলি মাইক্রোসার্ভিস আর্কিটেকচারে service discovery এর জন্য কাজ করতে পারে। এটি স্বয়ংক্রিয়ভাবে সার্ভিসগুলির মধ্যে যোগাযোগ এবং লোড ব্যালান্সিং নিশ্চিত করে।
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ServiceDiscovery serviceDiscovery = hz.getServiceDiscovery();
serviceDiscovery.discover("my-service");
Hazelcast ডিস্ট্রিবিউটেড ক্যাশিং সমর্থন করে, যা মাইক্রোসার্ভিসের মধ্যে ডেটার শেয়ারিং এবং দ্রুত অ্যাক্সেস নিশ্চিত করে। উদাহরণস্বরূপ, একটি distributed cache:
IMap<String, String> cache = hz.getMap("myCache");
cache.put("key", "value");
এটি একাধিক মাইক্রোসার্ভিসে একই ক্যাশ ডেটা অ্যাক্সেস করতে সহায়ক।
Hazelcast-এ পারফরম্যান্স টিউনিংয়ের মাধ্যমে আপনি সিস্টেমের কার্যকারিতা এবং স্কেলেবিলিটি আরও উন্নত করতে পারেন। কিছু গুরুত্বপূর্ণ পদ্ধতি:
Hazelcast বিভিন্ন ক্লাউড প্ল্যাটফর্মের সাথে ইন্টিগ্রেট করা যায়, যেমন AWS, Azure, এবং Google Cloud। Hazelcast এর Cloud Discovery এবং Cloud Management ফিচার ব্যবহার করে আপনি সহজেই ক্লাউড পরিবেশে Hazelcast ক্লাস্টার পরিচালনা করতে পারেন।
Hazelcast AWS এ ব্যবহারের জন্য স্বয়ংক্রিয় ক্লাস্টার স্কেলিং এবং ডিসকভারি সমর্থন করে।
<hazelcast-cloud>
<cloud-aws>
<access-key>your-access-key</access-key>
<secret-key>your-secret-key</secret-key>
</cloud-aws>
</hazelcast-cloud>
Advanced Hazelcast Topics এর মধ্যে অনেক উন্নত ফিচার রয়েছে, যেমন Distributed Caching, WAN Replication, Transactional Data Structures, Hazelcast Jet, এবং Microservices Integration। এগুলি ব্যবহার করে আপনি ডিস্ট্রিবিউটেড অ্যাপ্লিকেশন এবং মাইক্রোসার্ভিস আর্কিটেকচারে পারফরম্যান্স এবং স্কেলেবিলিটি উন্নত করতে পারবেন। Hazelcast এর এই অগ্রসর বৈশিষ্ট্যগুলি ডিস্ট্রিবিউটেড সিস্টেমের জন্য শক্তিশালী এবং কার্যকরী সমাধান প্রদান করে।
Hazelcast একটি ডিস্ট্রিবিউটেড ডেটা গ্রিড যা distributed querying এর মাধ্যমে ডেটা দ্রুত এবং কার্যকরীভাবে অনুসন্ধান করার সুযোগ দেয়। যদিও Hazelcast সাধারণ predicate-based querying সাপোর্ট করে, তবে কিছু Advanced Querying Techniques রয়েছে যা বড় ডেটাসেটে আরও দ্রুত এবং দক্ষ অনুসন্ধান করতে সহায়ক।
এই টিউটোরিয়ালে আমরা Hazelcast-এ Advanced Querying Techniques এর বিভিন্ন পদ্ধতি আলোচনা করব, যার মাধ্যমে আপনি ডিস্ট্রিবিউটেড ডেটা সিস্টেমে আরও জটিল এবং দক্ষ অনুসন্ধান কার্যক্রম পরিচালনা করতে পারবেন।
Hazelcast Query Language (HQL) হল একটি শক্তিশালী কুয়েরি ল্যাঙ্গুয়েজ, যা ডিস্ট্রিবিউটেড ডেটা স্ট্রাকচারগুলির (যেমন IMap
, IList
, ISet
) মধ্যে ডেটা অনুসন্ধান এবং ফিল্টারিং করতে ব্যবহৃত হয়। এই ভাষা Predicates এর সাহায্যে সহজেই ডেটার উপর কুয়েরি করা যায়।
Predicate<Integer, String> predicate = Predicates.and(
Predicates.equal("name", "Hazelcast"),
Predicates.greaterThan("age", 30)
);
এখানে and
ব্যবহার করে দুইটি শর্তকে একত্রিত করা হয়েছে, যা ডেটার ফিল্টারিংয়ে সহায়ক।
Predicate<Integer, String> predicate = Predicates.not(
Predicates.equal("name", "Hazelcast")
);
এখানে not
ব্যবহার করে "name" ফিল্ডের মান "Hazelcast" নয় এমন ডেটা পাওয়া যাবে।
Hazelcast-এ Indexing হল একটি গুরুত্বপূর্ণ কৌশল যা কুয়েরি কর্মক্ষমতা বৃদ্ধি করতে সহায়ক। বিভিন্ন ধরনের ইনডেক্স ব্যবহার করে ডেটার দ্রুত অনুসন্ধান করা যায়।
Config config = new Config();
MapConfig mapConfig = new MapConfig("myMap");
// Create an index on 'age' field
MapIndexConfig indexConfig = new MapIndexConfig("age", true); // true for ordered index
mapConfig.addMapIndexConfig(indexConfig);
config.addMapConfig(mapConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
এখানে, age
ফিল্ডের উপর একটি সূচক তৈরি করা হয়েছে যা দ্রুত অনুসন্ধানে সাহায্য করবে।
Hazelcast-এ Aggregation Queries এর মাধ্যমে আপনি ডিস্ট্রিবিউটেড ডেটাতে sum, count, avg, min, max প্রভৃতি গণনা অপারেশন করতে পারেন। এটি খুবই উপকারী যখন আপনি বড় ডেটাসেট থেকে সারাংশ বা পরিসংখ্যান বের করতে চান।
IMap<Integer, String> map = hz.getMap("myMap");
// Example: Count how many entries have the value "Hazelcast"
Predicate<Integer, String> predicate = Predicates.equal("value", "Hazelcast");
long count = map.values(predicate).size();
System.out.println("Count of Hazelcast entries: " + count);
এখানে, একটি কুয়েরি ব্যবহার করে "Hazelcast" মানের এন্ট্রিগুলোর সংখ্যা গুন করা হয়েছে।
Hazelcast-এ Query Caching হল একটি পদ্ধতি যার মাধ্যমে পূর্বে করা কুয়েরির ফলাফল ক্যাশে রাখা হয়, যাতে পরবর্তীতে একই কুয়েরি করার সময় দ্রুত ফলাফল পাওয়া যায়। এটি পারফরম্যান্স বৃদ্ধির জন্য বিশেষভাবে কার্যকর।
MapConfig mapConfig = new MapConfig("myMap");
mapConfig.setQueryCacheEnabled(true);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
// Now, subsequent queries will be faster due to caching
এখানে, query cache সক্ষম করা হয়েছে, যাতে বারবার একই কুয়েরি করার সময় তা ক্যাশ থেকে দ্রুত প্রাপ্ত হয়।
Hazelcast এর Full-text Indexing এর মাধ্যমে আপনি টেক্সট ডেটার মধ্যে অনুসন্ধান করতে পারেন, যেমন কোন নির্দিষ্ট শব্দ বা প্যাটার্ন খোঁজা।
MapConfig mapConfig = new MapConfig("myMap");
MapIndexConfig indexConfig = new MapIndexConfig("description", true); // Full-text index
mapConfig.addMapIndexConfig(indexConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
// Now you can run full-text search queries
এখানে, description
ফিল্ডের উপর একটি পূর্ণপথ সূচক তৈরি করা হয়েছে, যা টেক্সট অনুসন্ধানকে দ্রুত করবে।
Hazelcast-এ pagination এর মাধ্যমে আপনি বড় ডেটা সেট থেকে নির্দিষ্ট পরিমাণ ডেটা পেতে পারেন। এটি ক্লাস্টারে বড় ডেটা অনুসন্ধান করার সময় অত্যন্ত সহায়ক।
Predicate<Integer, String> predicate = Predicates.equal("category", "electronics");
Query<Integer, String> query = map.query(predicate);
int pageSize = 10;
int currentPage = 1;
PagedIterator<Map.Entry<Integer, String>> iterator = query.iterator();
for (int i = 0; i < pageSize * currentPage; i++) {
iterator.next();
}
// Get the current page results
for (int i = 0; i < pageSize && iterator.hasNext(); i++) {
Map.Entry<Integer, String> entry = iterator.next();
System.out.println(entry.getKey() + ": " + entry.getValue());
}
এখানে, pagination ব্যবহার করে ডেটা অনুসন্ধান করতে একাধিক পৃষ্ঠা তৈরি করা হয়েছে এবং প্রতি পৃষ্ঠায় ১০টি ফলাফল দেখানো হচ্ছে।
Hazelcast-এ কাস্টম filters এবং aggregations ব্যবহার করে আরও শক্তিশালী কুয়েরি তৈরি করা যেতে পারে, যা complex queries এবং data processing এর জন্য সহায়ক।
Predicate<Integer, String> predicate = Predicates.and(
Predicates.greaterThan("price", 100),
Predicates.like("name", "Laptop%")
);
IMap<Integer, String> map = hz.getMap("myMap");
map.values(predicate).forEach(entry -> {
System.out.println(entry);
});
এখানে, একটি complex query তৈরি করা হয়েছে, যা "price" ফিল্ডের মান 100 এর বেশি এবং "name" ফিল্ডে "Laptop" শব্দটি থাকা এন্ট্রিগুলি নির্বাচন করবে।
Hazelcast Advanced Querying Techniques ব্যবহার করে আপনি complex queries, aggregation, full-text search, pagination, এবং query caching সহ অনেক শক্তিশালী অনুসন্ধান পদ্ধতি পরিচালনা করতে পারেন। Indexing এবং Predicate ব্যবহারের মাধ্যমে ডেটার কার্যকরী অনুসন্ধান করা সম্ভব, এবং aggregations এবং filters ব্যবহার করে আপনি বড় ডেটাসেটের উপর জটিল বিশ্লেষণ ও প্রসেসিং করতে পারবেন। Hazelcast-এর এই advanced querying কৌশলগুলি ডিস্ট্রিবিউটেড ডেটার মধ্যে আরও দ্রুত এবং দক্ষ অনুসন্ধান নিশ্চিত করে, যা স্কেলেবল সিস্টেমে অত্যন্ত গুরুত্বপূর্ণ।
Distributed Computing এবং Fault Tolerance হল দুটি গুরুত্বপূর্ণ ধারণা, যা আধুনিক ডিস্ট্রিবিউটেড সিস্টেম, যেমন Hazelcast-এ, কর্মক্ষমতা, স্থিতিশীলতা এবং স্কেলেবিলিটি নিশ্চিত করতে ব্যবহৃত হয়। এই দুটি বৈশিষ্ট্য সিস্টেমের পারফরম্যান্স বাড়ানোর পাশাপাশি ডেটা সুরক্ষা এবং অখণ্ডতা বজায় রাখতে সহায়ক।
এই টিউটোরিয়ালে, আমরা Distributed Computing এবং Fault Tolerance কীভাবে Hazelcast-এ কাজ করে, তার মৌলিক ধারণা, এবং তাদের বাস্তব প্রয়োগ সম্পর্কে আলোচনা করব।
Distributed Computing হল একটি কৌশল যেখানে একাধিক কম্পিউটার বা নোড একসাথে কাজ করে বড় আকারের সমস্যার সমাধান করতে। এটি মূলত parallel computing এর একটি রূপ, যেখানে কাজগুলো বিভিন্ন নোডে ভাগ করা হয় এবং একসাথে সম্পাদিত হয়। Hazelcast-এ, Distributed Computing প্রধানত data partitioning, distributed processing, এবং task execution এর মাধ্যমে করা হয়।
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ExecutorService executor = hz.getExecutorService("myExecutor");
// Define a task to run on all cluster nodes
Runnable task = new Runnable() {
public void run() {
System.out.println("Distributed task executed on: " + Thread.currentThread().getName());
}
};
// Execute the task on all nodes
executor.submitToAllMembers(task);
এখানে, ExecutorService ব্যবহার করে task সকল Hazelcast ক্লাস্টার নোডে পাঠানো হয়েছে এবং প্রত্যেক নোডে একসাথে কার্যকর হয়েছে।
Fault Tolerance হল এমন একটি বৈশিষ্ট্য যা নিশ্চিত করে যে, যদি কোনো নোড বা সার্ভার ব্যর্থ হয়, তবে সিস্টেমের অন্যান্য অংশগুলি স্বাভাবিকভাবে কাজ করে থাকবে এবং সিস্টেমের কোনো ক্ষতি বা ডাউনটাইম হবে না। Hazelcast-এ Fault Tolerance সাধারণত data replication এবং partition rebalancing এর মাধ্যমে নিশ্চিত করা হয়।
Config config = new Config();
MapConfig mapConfig = new MapConfig("myMap");
mapConfig.setBackupCount(1); // Set one backup for each partition
config.addMapConfig(mapConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
এখানে, setBackupCount(1)
দিয়ে আমরা প্রতিটি পার্টিশনের জন্য একটি ব্যাকআপ কনফিগার করেছি। ফলে, যদি কোনো পার্টিশন নষ্ট হয়, তার ব্যাকআপ অন্য নোডে পাওয়া যাবে এবং সিস্টেম ব্যাহত হবে না।
Hazelcast-এ Distributed Computing এবং Fault Tolerance একসাথে কাজ করে:
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ExecutorService executor = hz.getExecutorService("myExecutor");
// Task definition with fault tolerance
Runnable task = new Runnable() {
public void run() {
// Perform computation across distributed nodes
System.out.println("Task is executing on node: " + Thread.currentThread().getName());
}
};
// Execute task on all nodes with fault tolerance
executor.submitToAllMembers(task);
এখানে, Hazelcast ExecutorService ব্যবহার করে ডিস্ট্রিবিউটেড টাস্কের মাধ্যমে কাজের ভারসাম্য নিশ্চিত করে, যাতে নোড ব্যর্থ হলেও কার্যক্রম চালু থাকে।
Distributed Computing এবং Fault Tolerance Hazelcast-এ সিস্টেমের স্কেলেবিলিটি, পারফরম্যান্স এবং স্থিতিশীলতা নিশ্চিত করতে অপরিহার্য। Hazelcast ডেটাকে পার্টিশন এবং রেপ্লিকেট করে ডিস্ট্রিবিউটেড কম্পিউটিং নিশ্চিত করে এবং ব্যাকআপ কনফিগারেশন ও ক্লাস্টার রিব্যালান্সিং এর মাধ্যমে Fault Tolerance বজায় রাখে। এই দুটি বৈশিষ্ট্য একসাথে কাজ করে, যাতে ডিস্ট্রিবিউটেড সিস্টেমে ডেটার অখণ্ডতা এবং কার্যক্ষমতা বজায় থাকে, পাশাপাশি সিস্টেম ব্যর্থ হলেও তার পুনরুদ্ধার সহজ হয়।
Hazelcast Jet হল একটি ডিস্ট্রিবিউটেড স্ট্রিমিং এবং বATCH প্রসেসিং প্ল্যাটফর্ম যা উচ্চ পারফরম্যান্স এবং স্কেলেবিলিটি সহ রিয়েল-টাইম ডেটা প্রসেসিং সমর্থন করে। এটি Hazelcast ডিস্ট্রিবিউটেড ডেটা গ্রিডের উপর ভিত্তি করে তৈরি এবং সম্পূর্ণরূপে ডিস্ট্রিবিউটেড পরিবেশে কার্যকরভাবে কাজ করে। Hazelcast Jet বড় পরিমাণ ডেটা সহজে প্রক্রিয়া করতে পারে, যা বিভিন্ন স্ট্রিমিং ডেটা উৎস (যেমন Kafka, RabbitMQ, বা HTTP) থেকে ডেটা গ্রহণ করে এবং সেই ডেটার উপর রিয়েল-টাইম বিশ্লেষণ এবং প্রক্রিয়াকরণ করতে ব্যবহৃত হয়।
Hazelcast Jet এর আর্কিটেকচার হল ডিস্ট্রিবিউটেড এবং পারালাল প্রসেসিং ব্যবস্থাপনা। এটি একটি master-worker model অনুসরণ করে, যেখানে:
Hazelcast Jet ক্লাস্টারের মধ্যে ডেটা পার্টিশন করে এবং সেই অনুযায়ী ডিস্ট্রিবিউটেড প্রক্রিয়াকরণ করে, যাতে স্কেলেবিলিটি এবং পারফরম্যান্স নিশ্চিত করা যায়।
Hazelcast Jet এ রিয়েল-টাইম ডেটা প্রক্রিয়াকরণ কাজগুলো Pipelines ব্যবহার করে করা হয়। Pipelines হল একধরনের ডেটা প্রবাহ যার মাধ্যমে ডেটা উৎস থেকে ইনপুট গ্রহণ করে প্রক্রিয়া করা হয় এবং আউটপুট হিসেবে ডেটা প্রদান করা হয়।
এখানে একটি সাধারণ স্ট্রিম প্রক্রিয়াকরণ উদাহরণ দেওয়া হল যেখানে ডেটার উপর নির্দিষ্ট অপারেশন প্রয়োগ করা হয় এবং আউটপুট প্রিন্ট করা হয়।
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.pipeline.*;
import com.hazelcast.jet.pipeline.test.TestSources;
public class StreamProcessingExample {
public static void main(String[] args) {
// Hazelcast Jet instance তৈরি
JetInstance jet = Jet.newJetInstance();
// Pipeline তৈরি করা
Pipeline p = Pipeline.create();
// Source: একটি স্ট্রিম উৎস (উদাহরণস্বরূপ একটি ইন-মেমরি উৎস)
p.readFrom(TestSources.items(1, 2, 3, 4, 5))
.map(i -> i * 2) // ডেটার প্রতি আইটেমে *2 করা
.writeTo(Sinks.logger()); // আউটপুট হিসেবে লগে লেখা
// Pipeline চালানো
jet.newJob(p).join();
// Hazelcast Jet instance বন্ধ করা
jet.shutdown();
}
}
এই উদাহরণে:
TestSources.items
: ইন-মেমরি উৎস থেকে ডেটা স্ট্রিম করা হচ্ছে।map(i -> i * 2)
: ডেটার প্রতিটি উপাদানে গুণ করা হচ্ছে।Sinks.logger()
: আউটপুট লগে লেখা হচ্ছে।এটি একটি খুব সাধারণ স্ট্রিমিং প্রক্রিয়াকরণের উদাহরণ, যা ইনপুট ডেটার প্রতি আইটেমকে প্রসেস করে এবং আউটপুট আকারে প্রিন্ট করে।
Hazelcast Jet Kafka-এর সাথে সংযোগ স্থাপন করতে পারে এবং রিয়েল-টাইম ডেটা সংগ্রহ করে প্রক্রিয়া করতে সক্ষম। নিচে একটি উদাহরণ দেওয়া হয়েছে যেখানে Kafka থেকে ডেটা নিয়ে প্রক্রিয়া করা হচ্ছে:
import com.hazelcast.jet.pipeline.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class KafkaStreamProcessing {
public static void main(String[] args) {
// Hazelcast Jet instance তৈরি
JetInstance jet = Jet.newJetInstance();
// Pipeline তৈরি করা
Pipeline p = Pipeline.create();
// Kafka থেকে ডেটা পাঠানো
p.readFrom(KafkaSources.kafka(
"my-topic", "localhost:9092",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.map(msg -> msg.getKey() + ":" + msg.getValue())
.writeTo(Sinks.logger());
// Pipeline চালানো
jet.newJob(p).join();
// Hazelcast Jet instance বন্ধ করা
jet.shutdown();
}
}
এখানে:
KafkaSources.kafka()
: Kafka থেকে ডেটা নেওয়া হচ্ছে।map()
: Kafka message গুলোর উপর প্রক্রিয়াকরণ করা হচ্ছে (উদাহরণস্বরূপ, key এবং value একত্রিত করা)।Sinks.logger()
: লগে আউটপুট লেখা হচ্ছে।এই কোডটি Kafka থেকে ডেটা পড়ে এবং সেটি প্রক্রিয়া করে লগে আউটপুট দেয়।
Hazelcast Jet হল একটি শক্তিশালী, উচ্চ পারফরম্যান্স ডিস্ট্রিবিউটেড স্ট্রিমিং এবং ব্যাচ প্রসেসিং প্ল্যাটফর্ম যা ডেটা স্ট্রিমিং, রিয়েল-টাইম প্রক্রিয়াকরণ এবং ডিস্ট্রিবিউটেড ডেটা বিশ্লেষণে ব্যবহৃত হয়। এটি Kafka এবং অন্যান্য ডেটা উৎসের সাথে ইন্টিগ্রেট করা যায় এবং রিয়েল-টাইম ডেটা প্রক্রিয়াকরণ এবং বিশ্লেষণ সহজে করতে সক্ষম। Hazelcast Jet এর মাধ্যমে আপনি scalable, fault-tolerant, এবং real-time data pipeline তৈরি করতে পারেন।
common.read_more